package com.kuainiu.beidou.engine.component.core

import scala.collection.JavaConverters._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.commons.lang3.StringUtils
import com.kuainiu.beidou.engine.component.exception.LoadDataException
import com.kuainiu.beidou.engine.component.domain.{DataInfo, DataInfos, DataProtocol, DataProtocols, FileType, SampleInfo, StoreEngine}

//object DataLoad {
//
//  def apply(): DataLoad = new DataLoad()

//  def main(args: Array[String]): Unit = {
//
//    val dataProtocols = new DataProtocols()
//    dataProtocols.add(new DataProtocol(null, null))
//    dataProtocols.add(new DataProtocol(null, null))
//
//    new DataLoad().loadDataInfos(null, dataProtocols)
//  }

//}

object DataLoad {

  /**
   * 根据协议进行数据加载
   * @param spark SparkSession
   * @param dataProtocols 数据协议信息
   * @return  DataInfos
   */
  def loadDataInfos(spark: SparkSession, dataProtocols: DataProtocols): DataInfos ={
    //输出对象
    val dataInfos = new DataInfos()
    //遍历处理数据
    dataProtocols.asScala.foreach(dataProtocol => {
      //加载数据信息
      val dataFrame = {
          dataProtocol.storeInfo.storeEngine match {
          case StoreEngine.HIVE =>
            this.loadHiveData(spark, dataProtocol)
          case StoreEngine.HDFS =>
            this.loadHdfsData(spark, dataProtocol)
          case StoreEngine.LOCAL =>
            throw new LoadDataException("暂不支持的存储引擎 local 模式")
          case _ =>
            throw new LoadDataException(s"暂不支持的存储引擎${dataProtocol.storeInfo.storeEngine}")
        }
      }
      //对Sample信息进行标准话处理
      val sampleFrame = this.dealSampleInfo(dataFrame, dataProtocol.sampleInfo)
      //对schema进行标准话处理

      //过滤当前选择的列

      //返回标准化的数据
      dataInfos.add(new DataInfo(sampleFrame, sampleFrame.schema))
    })
    //返回数据信息
    dataInfos
  }

  /**
   * 加载HIVE表中的数据信息
   * @param spark SparkSession
   * @param dataProtocol  数据协议信息
   * @return
   */
  private def loadHiveData(spark: SparkSession, dataProtocol: DataProtocol) : DataFrame ={
    //获取到hive的数据库
    val dataBase = dataProtocol.storeInfo.db
    //若协议中没有数据库信息则抛出错误
    if (StringUtils.isBlank(dataBase)) throw new LoadDataException("加载数据时存储引擎为hive，但是 db 为空，请检查")
    //获取到hive的数据表
    val tableName = dataProtocol.storeInfo.target
    //若协议中没有数据表信息则抛出错误
    if (StringUtils.isBlank(tableName)) throw new LoadDataException("加载数据时存储引擎为hive，但是 target (表名) 为空，请检查")
    //加载HIVE表的数据
    spark.sql(s"select * from ${dataBase}.${tableName}")
  }

  /**
   * 加载HDFS文件数据信息
   * @param spark SparkSession
   * @param dataProtocol  数据协议信息
   * @return
   */
  private def loadHdfsData(spark: SparkSession, dataProtocol: DataProtocol) : DataFrame = {
    //获取数据的目标路径
    val target = dataProtocol.storeInfo.target
    if (null == target || target.trim.isEmpty) throw new LoadDataException("尝试加载hdfs时，发现文件路径为空，请检查")
    //获取数据的类型
    val fileType = dataProtocol.storeInfo.fileType
    //路由
    fileType match {
      case FileType.JSON => this.loadHdfsJson(spark, dataProtocol)
      case FileType.CSV_WITH_HEADER => this.loadHdfsCsv(spark, dataProtocol, true)
      case FileType.CSV_WITHOUT_HEADER => this.loadHdfsCsv(spark, dataProtocol, false)
      case _ => throw new LoadDataException("暂不支持的hdfs文件类型")
    }
  }

  /**
   * 加载HDFS文件数据信息
   * @param spark SparkSession
   * @param dataProtocol  数据协议信息
   * @return
   */
  private def loadHdfsCsv(spark: SparkSession, dataProtocol: DataProtocol, header: Boolean) : DataFrame = {
    spark.read.format("csv").option("header", value = header).option("multiLine", value = true).load(dataProtocol.storeInfo.target)
  }

  /**
   * 加载HDFS文件数据信息
   * @param spark SparkSession
   * @param dataProtocol  数据协议信息
   * @return
   */
  private def loadHdfsJson(spark: SparkSession, dataProtocol: DataProtocol) : DataFrame = {
    spark.read.format("json").load(dataProtocol.storeInfo.target)
  }

  /**
   * 处理样本信息
   * @param dataFrame
   * @param sampleInfo
   * @return
   */
  private def dealSampleInfo(dataFrame: DataFrame, sampleInfo: SampleInfo): DataFrame ={

    dataFrame
  }

}
